package com.atuguigu.flink.Day05;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

// 基于间隔的join
// 每个用户的点击Join这个用户最近10分钟内的浏览
public class Example4 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> clickStream = env
                .fromElements(
                        new Event("user1", "click", 20 * 60 * 1000L))
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Event>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                                    @Override
                                    public long extractTimestamp(Event element, long recordTimestamp) {
                                        return element.timestamp;
                                    }
                                })
                );


        SingleOutputStreamOperator<Event> browseStream = env
                .fromElements(
                        new Event("user1", "browse", 2 * 60 * 1000L),
                        new Event("user1", "browse", 15* 60 * 1000L),
                        new Event("user1", "browse", 50* 60 * 1000L),
                        new Event("user1", "browse", 16 * 60 * 1000L))
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Event>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                                    @Override
                                    public long extractTimestamp(Event element, long recordTimestamp) {
                                        return element.timestamp;
                                    }
                                })
                );

        clickStream
                .keyBy(r->r.userId)
                .intervalJoin(browseStream.keyBy(r->r.userId))
                .between(Time.minutes(-10),Time.minutes(0))
                .process(new ProcessJoinFunction<Event, Event, String>() {
                    @Override
                    public void processElement(Event left, Event right, Context ctx, Collector<String> out) throws Exception {
                        out.collect(left + "=>" +right);
                    }
                })
                .print();

        browseStream
                .keyBy(r->r.userId)
                .intervalJoin(clickStream.keyBy(r->r.userId))
                .between(Time.minutes(0),Time.minutes(10))
                .process(new ProcessJoinFunction<Event, Event, String>() {
                    @Override
                    public void processElement(Event left, Event right, Context ctx, Collector<String> out) throws Exception {
                        out.collect(left + "=>" +right);
                    }
                })
                .print();


        env.execute();
    }

    public static class Event {
        public String userId;
        public String eventType;
        public Long timestamp;

        public Event() {
        }

        public Event(String userId, String eventType, Long timestamp) {
            this.userId = userId;
            this.eventType = eventType;
            this.timestamp = timestamp;
        }

        @Override
        public String toString() {
            return "Event{" +
                    "orderId='" + userId + '\'' +
                    ", eventType='" + eventType + '\'' +
                    ", timestamp=" + timestamp +
                    '}';
        }
    }
}
